use crate::app::core::{
    ControlEvent, ControlEventDispenser, EventKind, HttpRouterOpt, HttpRouterOptTy,
};
use crate::config::IngressLister;
use anyhow::Result;
use futures::prelude::*;
use k8s_openapi::api::networking::v1::Ingress;
use kube::core::params::ListParams;
use kube::runtime::watcher::Event;
use kube::{runtime::watcher, Api, Client};
use std::sync::Arc;

pub struct IngController {
    client: Client,
    cfg: IngressLister,
}

impl IngController {
    pub fn new(client: Client, cfg: IngressLister) -> Self {
        Self { client, cfg }
    }
}
impl IngController {
    pub fn watch_ingress(self, dispenser: Arc<dyn ControlEventDispenser>) -> Result<()> {
        if !self.cfg.enable {
            return Ok(());
        }
        wd_log::log_info_ln!(
            "开始注册ingress服务监听器。。。 namespace=[{}] selector=[{}]",
            &self.cfg.namespaces.as_ref().unwrap_or(&String::new()),
            &self.cfg.selector
        );
        let api: Api<Ingress> = match self.cfg.namespaces {
            Some(s) => Api::namespaced(self.client, &s),
            None => Api::all(self.client),
        };
        let params = ListParams::default().labels(&self.cfg.selector);
        // let result = api.watch(&params, "0").await?;
        let mut watcher = watcher(api, params).boxed();
        tokio::spawn(async move {
            #[allow(irrefutable_let_patterns)]
            while let res = watcher.try_next().await {
                let event = match res {
                    Ok(o) => o,
                    Err(err) => {
                        wd_log::log_error_ln!("watch k8s cluster ingress error：{:?}", err);
                        continue;
                    }
                };
                let ingress = if let Some(s) = event {
                    wd_log::log_info_ln!("订阅到集群的ingress事件：{:?}", s);
                    s
                } else {
                    wd_log::log_info_ln!("get event is none from ingress event option");
                    continue;
                };
                let list = Self::event_ingress_to_control_event(ingress);
                for i in list.into_iter() {
                    if let Err(err) = dispenser.dispatch(i).await {
                        wd_log::log_error_ln!("ingress event dispatch error:{}", err.to_string());
                    }
                }
            }
        });
        Ok(())
    }
    fn event_ingress_to_control_event(e: Event<Ingress>) -> Vec<ControlEvent> {
        let mut list = vec![];
        match e {
            Event::Applied(ing) => {
                if let Some(ce) = Self::ingress_to_control_events(ing, false) {
                    list.push(ce);
                }
            }
            Event::Deleted(ing) => {
                if let Some(ce) = Self::ingress_to_control_events(ing, true) {
                    list.push(ce)
                }
            }
            Event::Restarted(ings) => {
                for ing in ings.into_iter() {
                    if let Some(ce) = Self::ingress_to_control_events(ing, false) {
                        list.push(ce);
                    }
                }
            }
        }
        return list;
    }
    fn ingress_to_control_events(ing: Ingress, delete: bool) -> Option<ControlEvent> {
        if ing.spec.is_none() {
            return None;
        }
        if ing.spec.as_ref().unwrap().default_backend.is_none()
            && ing.spec.as_ref().unwrap().rules.is_none()
        {
            return None;
        }
        let namespaces = ing
            .metadata
            .namespace
            .as_ref()
            .map(|s| s.to_string())
            .unwrap_or(String::new());
        let ep_name: String = {
            if let Some(ref s) = ing.metadata.labels {
                if let Some(s) = s.get("entry_point_name") {
                    s.to_string()
                } else {
                    String::from("default")
                }
            } else {
                String::from("default")
            }
        };
        let mut ce = ControlEvent::default().set_kind(EventKind::Operation(ep_name.clone()));
        let spec = ing.spec.unwrap();
        if let Some(back) = spec.default_backend {
            if delete {
                let mut hto = HttpRouterOpt::default();
                hto.opt_ty = HttpRouterOptTy::Delete;
                hto.host = format!("**{}", ep_name);
                ce = ce.push_http_router(hto);
            } else {
                if let Some(s) = back.service {
                    if let Some(port) = s.port {
                        //只处理number端口，忽略外部端口
                        if let Some(p) = port.number {
                            let mut hto = HttpRouterOpt::default();
                            hto.target_port = p as u16;
                            hto.target_host = s.name;
                            hto.host = format!("**{}", ep_name);
                            hto.namespaces = namespaces.clone();
                            hto.path = "/".to_string();
                            ce = ce.push_http_router(hto);
                        }
                    }
                }
            }
        }
        else{
            let mut hto = HttpRouterOpt::default();
            hto.opt_ty = HttpRouterOptTy::Delete;
            hto.host = format!("**{}", ep_name);
            ce = ce.push_http_router(hto);
        }
        for i in spec.rules.unwrap().into_iter() {
            if delete {
                if let Some(host) = i.host {
                    let mut hto = HttpRouterOpt::default();
                    hto.opt_ty = HttpRouterOptTy::Delete;
                    hto.host = host;
                    ce = ce.push_http_router(hto);
                }
                continue;
            }
            if i.host.as_ref().is_none() {
                continue;
            }
            if let Some(htp) = i.http {
                for path in htp.paths.into_iter() {
                    if let Some(svc) = path.backend.service {
                        if let Some(port) = svc.port {
                            if let Some(port) = port.number {
                                let mut hto = HttpRouterOpt::default();
                                hto.namespaces = namespaces.clone();
                                hto.host = i.host.as_ref().unwrap().clone();
                                hto.path = path
                                    .path
                                    .as_ref()
                                    .map(|s| s.clone())
                                    .unwrap_or(String::from("/"));
                                hto.target_port = port as u16;
                                hto.target_host = svc.name;
                                ce = ce.push_http_router(hto);
                            }
                        }
                    }
                }
            }
        }
        Some(ce)
    }
}
